// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*************************************************************************
 * @file OwnershipStrengthSubscriber.cpp
 * This file contains the implementation of the subscriber functions.
 *
 * This file was generated by the tool fastcdrgen.
 */

#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/subscriber/SampleInfo.hpp>
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>

#include "OwnershipStrengthSubscriber.h"

#include <algorithm>

using namespace eprosima::fastdds::dds;
using namespace eprosima::fastrtps::rtps;

OwnershipStrengthSubscriber::OwnershipStrengthSubscriber()
    : participant_(nullptr)
    , subscriber_(nullptr)
    , topic_(nullptr)
    , reader_(nullptr)
    , myType(new ExampleMessagePubSubType())
{
}

OwnershipStrengthSubscriber::~OwnershipStrengthSubscriber()
{
    if (reader_ != nullptr)
    {
        subscriber_->delete_datareader(reader_);
    }
    if (topic_ != nullptr)
    {
        participant_->delete_topic(topic_);
    }
    if (subscriber_ != nullptr)
    {
        participant_->delete_subscriber(subscriber_);
    }
    DomainParticipantFactory::get_instance()->delete_participant(participant_);
}

bool OwnershipStrengthSubscriber::init()
{
    //Create Participant
    DomainParticipantQos pqos;
    pqos.wire_protocol().builtin.discovery_config.leaseDuration = eprosima::fastrtps::c_TimeInfinite;
    pqos.name("Participant_subscriber"); //You can put the name you want

    participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos);
    if (participant_ == nullptr)
    {
        return false;
    }

    //Register the type
    myType.register_type(participant_);

    // Create Subscriber
    subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT);

    if (subscriber_ == nullptr)
    {
        return false;
    }

    //Create Topic
    topic_ = participant_->create_topic("OwnershipStrengthPubSubTopic", myType.get_type_name(), TOPIC_QOS_DEFAULT);

    if (topic_ == nullptr)
    {
        return false;
    }

    //Create DataReader
    DataReaderQos rqos;
    rqos.reliability().kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS;

    reader_ = subscriber_->create_datareader(topic_, rqos, &m_listener);

    if (reader_ == nullptr)
    {
        return false;
    }
    return true;
}

void OwnershipStrengthSubscriber::SubListener::on_subscription_matched(
        DataReader*,
        const SubscriptionMatchedStatus& info)
{
    if (info.current_count_change == 1)
    {
        n_matched = info.total_count;
        std::cout << "Subscriber matched." << std::endl;
    }
    else if (info.current_count_change == -1)
    {
        n_matched = info.total_count;
        std::cout << "Subscriber unmatched." << std::endl;
    }
    else
    {
        std::cout << info.current_count_change
                  << " is not a valid value for SubscriptionMatchedStatus current count change" << std::endl;
    }

    // Must deregister here to make sure messages from weaker publishers are accepted.
    // It is also possible to deregister on a lost deadline, if applicable.
    m_hierarchy.DeregisterPublisher(GUID_t(info.last_publication_handle));
}

bool OwnershipStrengthSubscriber::StrengthHierarchy::IsMessageStrong(
        const ExampleMessage& st,
        const SampleInfo& info)
{
    unsigned int ownershipStrength = st.ownershipStrength();
    GUID_t guid = info.sample_identity.writer_guid();

    mapMutex.lock();

    // The strength-GUID pair is inserted in the strength hierarchy, if unique.
    strengthMap[ownershipStrength].insert(guid);

    // The set of strongest writers is extracted from the back of the map (highest strength)
    std::set<GUID_t>& strongestWriters = strengthMap.rbegin()->second;

    // The prioritised writer is arbitrarily chosen as the one with the lowest GUID, in case of a tie.
    const GUID_t& prioritisedStrongestWriter = *(strongestWriters.begin());
    bool strong = (guid == prioritisedStrongestWriter);

    mapMutex.unlock();

    if (!strong)
    {
        std::cout << "Weak message received and discarded (strength " << ownershipStrength << ")" << std::endl;
    }

    return strong;
}

void OwnershipStrengthSubscriber::StrengthHierarchy::DeregisterPublisher(
        GUID_t guid)
{
    mapMutex.lock();

    // We walk through the hierarchy and remove the publisher GUID
    for (auto& guidSet : strengthMap)
    {
        guidSet.second.erase(guid);
    }

    mapMutex.unlock();
}

void OwnershipStrengthSubscriber::SubListener::on_data_available(
        DataReader* reader)
{
    SampleInfo info;
    ExampleMessage st;
    if (reader->take_next_sample(&st, &info) == ReturnCode_t::RETCODE_OK)
    {
        if (info.valid_data && m_hierarchy.IsMessageStrong(st, info))
        {
            // User message handling here, for a strong message
            ++n_msg;
            std::cout << "Message received with index " << n_msg << ", and strength " << st.ownershipStrength() \
                      << ", reading \"" << st.message() << "\"" << std::endl;
        }
    }
}

void OwnershipStrengthSubscriber::run()
{
    std::cout << "Waiting for Data, press Enter to stop the Subscriber. " << std::endl;
    std::cin.ignore();
    std::cout << "Shutting down the Subscriber." << std::endl;
}
